# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from hysop import vprint, dprint, Problem
from hysop.fields.continuous_field import ScalarField
from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.io_utils import IOParams
from hysop.tools.transposition_states import TranspositionState
from hysop.constants import MemoryOrdering, Backend
from hysop.parameters.parameter import Parameter
from hysop.topology.cartesian_topology import CartesianTopologyState
from hysop.core.graph.graph import (
new_directed_graph,
new_vertex,
new_edge,
is_directed_acyclic_graph,
transitive_reduction,
lexicographical_topological_sort,
all_simple_paths,
)
from hysop.core.graph.computational_graph import ComputationalGraph
from hysop.core.graph.computational_node import ComputationalGraphNode
from hysop.core.graph.computational_operator import ComputationalGraphOperator
from hysop.fields.field_requirements import (
DiscreteFieldRequirements,
MultiFieldRequirements,
)
from hysop.operator.redistribute import (
Redistribute,
RedistributeInter,
RedistributeInterParam,
RedistributeNotImplementedError,
)
from hysop.operator.transpose import Transpose, TranspositionNotImplementedError
from hysop.operator.memory_reordering import (
MemoryReordering,
MemoryReorderingNotImplementedError,
)
from hysop.tools.sympy_utils import subscript
# Debug level for graph building
# 0: no debug logs
# 1: print debug info about input/outputs and operator generation
# 2: print debug info about topology states in addition
GRAPH_BUILDER_DEBUG_LEVEL = 0
[docs]
def gprint(*args, **kwds):
level = kwds.pop("level", 1)
if GRAPH_BUILDER_DEBUG_LEVEL >= level:
print(*args, **kwds)
[docs]
def gprint2(*args, **kwds):
kwds["level"] = 2
gprint(*args, **kwds)
[docs]
class GraphBuilder:
"""
Helper class to build graph of computational operators (see ComputationalGraph).
"""
def __init__(self, node):
"""
Initialize a GraphBuilder with given target ComputationalGraph.
Parameters
----------
node: hysop.core.graph.ComputationalGraph
target node that should generate its graph
Notes
-----
The following class is a helper for ComputationalGraph graph building step.
It should not be used outside of ComputationalGraph.
"""
check_instance(node, ComputationalGraph)
self.target_node = node
gprint("::Graph builder::")
gprint(f">Initialized graph builder for ComputationalGraph {node.name}")
[docs]
def setup_graph(self):
self.graph = new_directed_graph()
[docs]
def setup_variables(self):
self.input_fields = {}
self.output_fields = {}
self.input_params = {}
self.output_params = {}
self.topology_states = {}
self.input_topology_states = {}
self.output_topology_states = {}
self.op_input_topology_states = {}
self.op_output_topology_states = {}
[docs]
def new_topology_state(self, field):
return self.__ContinuousFieldState(
field,
self.op_input_topology_states,
self.op_output_topology_states,
self.target_node._input_fields_to_dump,
self.target_node.method,
)
[docs]
def new_node(
self, op, subgraph, current_level, node, node_id, opvertex, insert_at=-1
):
# Adds new nodes to graph. Optional position of new nodes in nodelist (adjust node_id for nodes sorting)
graph = self.graph
opnode = new_vertex(graph, op).copy_attributes(opvertex)
gprint(f" *Created node is {int(opnode)}." + op.name)
if insert_at >= 0:
for _ in graph.nodes:
if _.node_id > insert_at:
_.node_id += 1
opnode.node_id = insert_at + 1
gprint(f" *Created moved to {int(opnode)}.")
return opnode
[docs]
def build_graph(self):
target_node = self.target_node
current_level = self.current_level
outputs_are_inputs = self.outputs_are_inputs
graph = self.graph
parameter_handler = self.__ParameterHandler(graph)
input_fields = self.input_fields
output_fields = self.output_fields
input_params = self.input_params
output_params = self.output_params
input_topology_states = self.input_topology_states
output_topology_states = self.output_topology_states
op_input_topology_states = self.op_input_topology_states
op_output_topology_states = self.op_output_topology_states
self._deferred_operators = []
self._double_check_inputs = {}
self._redistribute_inter = []
self._intertasks_exchanged = set()
# check that all target nodes are unique to prevent conflicts
if len(set(target_node.nodes)) != len(target_node.nodes):
duplicates = {
x for x in target_node.nodes if target_node.nodes.count(x) > 1
}
msg = (
"\n\nFATAL ERROR: ComputationalGraph {} contains mutiple references to "
)
msg += "the same nodes.\n"
msg += "Concerned operators are:\n"
for op in duplicates:
msg0 = " *Operator {:12s} (cls={:30s} | id={}): {} occurences\n"
msg0 = msg0.format(
op.name, type(op).__name__, id(op), target_node.nodes.count(op)
)
msg += msg0
msg = msg.format(target_node.name)
raise RuntimeError(msg)
def __handle_node(
node_id,
node,
subgraph,
node_ops,
node_vertices,
from_subgraph,
opvertex,
op,
opnode,
):
gprint(f" *{op.name} ({type(op)})")
opname = op.name
oppname = op.pretty_name
iparams = op.input_params
oparams = op.output_params
ifields = op.input_fields
ofields = op.output_fields
field_requirements = op._field_requirements
if field_requirements is None:
op.get_and_set_field_requirements()
field_requirements = op._field_requirements
if isinstance(op, RedistributeInter) or isinstance(
op, RedistributeInterParam
):
self._intertasks_exchanged = self._intertasks_exchanged.union(
{_.name for _ in list(op.output_fields) + list(output_params)}
)
self._intertasks_exchanged = self._intertasks_exchanged.union(
{_.name for _ in list(op.input_fields) + list(op.input_params)}
)
if not isinstance(op, Problem) and not isinstance(op, RedistributeInter):
# try to fill in undertermined topologies (experimental feature)
backends = op.supported_backends()
for ifield, itopo in sorted(ifields.items(), key=lambda x: x[0].name):
if itopo is not None:
continue
# look for ifield usage untill now
if (
(ifield in ofields)
and (ofields[ifield] is not None)
and (ofields[ifield].backend.kind in backends)
):
ifields[ifield] = ofields[ifield]
elif ifield not in self.topology_states:
if outputs_are_inputs:
# we can try to push this operator after we're done
self._deferred_operators.append((op, opnode))
else:
msg = (
"\nGraphBuilder {} could not automatically "
"determine the topology of input field {} in "
"operator {}.\nTry to set a non empty "
"TopologyDescriptor when passing the variable "
"parameters, when creating the operator."
"\nAutomatic topology detection is an "
"experimental feature."
)
msg = msg.format(target_node.name, ifield.name, op.name)
raise RuntimeError(msg)
else:
cstate = self.topology_states[ifield]
(itopo, dstate, node, ireqs) = cstate.first_topology_and_dstate
field_requirements.update_inputs({ifield: ireqs})
if itopo.backend.kind not in backends:
backend = itopo.backend.any_backend_from_kind(*backends)
itopo = itopo.topology_like(backend=backend)
ifields[ifield] = itopo
for ofield, otopo in sorted(ofields.items(), key=lambda x: x[0].name):
if otopo is not None:
continue
if (ofield in ifields) and (ifields[ofield] is not None):
ofields[ofield] = ifields[ofield]
elif ofield not in self.topology_states:
msg = (
"\nGraphBuilder {} could not automatically determine "
"the topology of input field {} in operator {}."
"\nTry to set a non empty TopologyDescriptor when "
"passing the variable parameters, when creating the "
"operator.\nAutomatic topology detection is an "
"experimental feature."
)
msg = msg.format(target_node.name, ofield.name, op.name)
raise RuntimeError(msg)
else:
cstate = self.topology_states[ofield]
(otopo, dstate, node, oreqs) = cstate.first_topology_and_dstate
field_requirements.update_outputs({ofield: oreqs})
ofields[ofield] = otopo
# iterate over subgraph operator input parameters
if iparams:
gprint(" >Input parameters")
for iparam in sorted(iparams.keys(), key=lambda x: x.name):
gprint(f" *{iparam.short_description()}")
parameter_handler.handle_input_parameter(iparam, opnode)
if iparam.name not in output_params:
input_params[iparam] = iparams[iparam]
# iterate over subgraph operator output parameters
if oparams:
gprint(" >Output parameters")
for oparam in sorted(oparams.keys(), key=lambda x: x.name):
gprint(f" *{oparam.short_description()}")
parameter_handler.handle_output_parameter(oparam, opnode)
output_params[oparam] = oparams[oparam]
# iterate over subgraph operator input fields
input_states = {}
if ifields:
gprint(" >Input fields")
for ifield, itopo in sorted(
ifields.items(), key=lambda x: x[0].name, reverse=True
):
gprint(
" *{}{}".format(
ifield.name,
(
" on an unknown topology"
if (itopo is None)
else f".{itopo.pretty_tag}"
),
)
)
if itopo is None:
assert isinstance(op, RedistributeInter)
continue
if isinstance(op, Problem):
if ifield in op.initial_input_topology_states.keys():
ifreqs = op.initial_input_topology_states[ifield][0]
else:
ifreqs = None
else:
if current_level != 0 or isinstance(op, Problem):
ifreqs = None
else:
ifreqs = field_requirements.get_input_requirement(ifield)[1]
if ifield not in self.topology_states:
cstate = self.new_topology_state(ifield)
self.topology_states[ifield] = cstate
is_new = True
else:
cstate = self.topology_states[ifield]
is_new = False
dstate = cstate.handle_input(opnode, itopo, ifreqs, graph, is_new)
input_states[ifield] = dstate
if is_new:
input_fields[ifield] = itopo
input_topology_states[ifield] = (ifreqs, dstate)
if ifield not in self._double_check_inputs:
self._double_check_inputs[ifield] = {}
self._double_check_inputs[ifield].update({itopo: (ifreqs, dstate)})
# iterate over subgraph operator output fields
output_states = {}
if ofields:
gprint(" >Output fields")
for ofield, otopo in sorted(
ofields.items(), key=lambda x: x[0].name, reverse=True
):
gprint(
" *{}{}".format(
ofield.name,
(
" on an unknown topology"
if (otopo is None)
else f".{otopo.pretty_tag}"
),
)
)
if otopo is None:
assert isinstance(op, RedistributeInter)
continue
if isinstance(op, Problem):
if ofield in op.final_output_topology_states.keys():
ofreqs = op.final_output_topology_states[ofield][0]
else:
ofreqs = None
else:
ofreqs = (
None
if (current_level != 0)
else field_requirements.get_output_requirement(ofield)[1]
)
istates = None if (current_level != 0) else input_states
cstate = self.topology_states.setdefault(
ofield, self.new_topology_state(ofield)
)
invalidate_field = ofield not in op.get_preserved_input_fields()
dstate = cstate.handle_output(
opnode,
otopo,
ofreqs,
op,
istates,
invalidate_field,
graph,
node_list=target_node.nodes,
)
output_fields[ofield] = otopo
output_states[ofield] = dstate
output_topology_states[ofield] = (None, dstate)
if (current_level == 0) and ((op, opnode) not in self._deferred_operators):
opnode.set_op_info(op, input_states, output_states)
op_input_topology_states[op] = input_states
op_output_topology_states[op] = output_states
def __find_elements_to_redistribute(available_elems, needed_elems):
# The algorithm is to extract level0 input fields and topologies as needs
# and meet with output fields and topologies as provided. The key feature is that
# these informations are distributed across distinct tasks (sub-communicators).
# Same algorithm is also used for parameters.
domain = first_not_None(
[
_.domain
for _ in set(available_elems.values()).union(
set(needed_elems.values())
)
if hasattr(_, "domain")
]
)
comm = domain.parent_comm
current_tasks = domain.current_task_list()
def _name_to_key(n, d):
var = [_ for _ in d.keys() if isinstance(_, str) and _ == n]
var += [_ for _ in d.keys() if not isinstance(_, str) and _.name == n]
if len(var) == 1:
return var[0]
return None
# Find redistribute candidates
available_names = {
_.name for _ in available_elems.keys()
} - self._intertasks_exchanged
needed_names = {
_.name for _ in needed_elems.keys()
} - self._intertasks_exchanged
mgs = " >[IT] Current task ({}) {} parameters and fields : {}"
gprint(
mgs.format(current_tasks, "can communicate", ", ".join(available_names))
)
gprint(mgs.format(current_tasks, "needs", ", ".join(needed_names)))
# Inter-task matching is performed on root process
available_names = {_: None for _ in available_names} # value is dest task
needed_names = {_: None for _ in needed_names} # value is src task
for current_task in current_tasks:
if domain.task_root_in_parent(current_task) == domain.parent_rank:
msg = ""
# loop over other tasks
for ot in (_ for _ in domain.all_tasks if _ != current_task):
if domain.task_root_in_parent(ot) == domain.parent_rank:
ot_needs = []
for _n in needed_names.keys():
_ntopo = needed_elems[_name_to_key(_n, needed_elems)]
if (
hasattr(_ntopo, "task_id")
and _ntopo.task_id == current_task
):
continue
else:
ot_needs.append(_n)
can_provide = [_ for _ in ot_needs if _ in available_names]
to_remove = []
for prov in can_provide:
ae = available_elems[
_name_to_key(prov, available_elems)
]
ne = needed_elems[_name_to_key(prov, needed_elems)]
if ae.task_id != ot and ne.task_id == ot:
available_names[prov] = ne.task_id
needed_names[prov] = ae.task_id
else:
to_remove.append(prov)
for rm in to_remove:
can_provide.remove(rm)
available_names[rm] = None
needed_names[rm] = None
else:
comm.isend(
list(needed_names.keys()),
dest=domain.task_root_in_parent(ot),
tag=4321,
)
ot_needs = comm.recv(
source=domain.task_root_in_parent(ot), tag=4321
)
can_provide = [_ for _ in ot_needs if _ in available_names]
for prov in can_provide:
available_names[prov] = ot
ae = available_elems[
_name_to_key(prov, available_elems)
]
assert ot != ae.task_id
comm.isend(
can_provide,
dest=domain.task_root_in_parent(ot),
tag=1234,
)
ot_provide = comm.recv(
source=domain.task_root_in_parent(ot), tag=1234
)
for _op in ot_provide:
needed_names[_op] = ot
ne = needed_elems[_name_to_key(_op, needed_elems)]
assert ot != ne.task_id
if len(ot_needs) > 0:
msg += "\n *Other task {} needs init for {}, we provide {}".format(
ot,
ot_needs,
"nothing" if len(can_provide) == 0 else can_provide,
)
if msg != "":
gprint(" >[IT] Inter-tasks matching:" + msg)
needed_names = {p: t for (p, t) in needed_names.items() if t is not None}
available_names = {
p: t for (p, t) in available_names.items() if t is not None
}
for current_task in current_tasks:
tcomm = domain.get_task_comm(current_task)
needed_names = tcomm.bcast(needed_names, root=0)
available_names = tcomm.bcast(available_names, root=0)
final_needed_names, final_available_names = {}, {}
for current_task in current_tasks:
_tmp_needed = dict(
(k, v) for k, v in needed_names.items() if v != current_task
)
_tmp_avail = dict(
(k, v) for k, v in available_names.items() if v != current_task
)
final_needed_names.update(_tmp_needed)
final_available_names.update(_tmp_avail)
needed_names, available_names = final_needed_names, final_available_names
gprint(
f" >[IT] Inter-tasks will send:to {available_names} and recieve:from {needed_names}"
)
# Get back the actual field or parameter
names_to_obj = {}
for p in available_names.keys():
names_to_obj[p] = _name_to_key(p, available_elems)
for p in needed_names.keys():
names_to_obj[p] = _name_to_key(p, needed_elems)
# group parameters with same other task
allp = []
tasks_to_name = {}
for p in sorted(
set(available_names.keys()).union(set(needed_names.keys()))
):
t = (
available_names[p]
if p in available_names.keys()
else needed_names[p]
)
if isinstance(names_to_obj[p], ScalarField):
allp.append(
[
p,
]
)
else:
if t in tasks_to_name:
tasks_to_name[t].append(p)
else:
tasks_to_name[t] = [
p,
]
for params in tasks_to_name.values():
allp.append(params)
for p in sorted(allp):
kwargs = {}
s_topo, r_topo, comm_dir = (None,) * 3
var = tuple(names_to_obj[_] for _ in p)
if p[0] in available_names:
t = available_names[p[0]]
topo = available_elems[var[0]]
comm_dir = "src"
s_topo = topo
if p[0] in needed_names:
t = needed_names[p[0]]
topo = needed_elems[var[0]]
comm_dir = "dest"
r_topo = topo
if not (s_topo is None or r_topo is None):
comm_dir = "src&dest"
t = None
assert not comm_dir is None
opname = "RI{}_{}{}{}_{}".format(
comm_dir,
"" if s_topo is None else s_topo.task_id,
"to" if not s_topo is None and not r_topo is None else "",
"" if r_topo is None else r_topo.task_id,
",".join(_.name for _ in var),
)
# Finalize init call
kwargs.update(
{
"name": opname,
"source_topo": s_topo,
"target_topo": r_topo,
"other_task_id": t,
}
)
if isinstance(var[0], ScalarField):
kwargs.update(
{
"variable": var[0],
"mpi_params": topo.mpi_params,
}
)
else:
kwargs.update(
{
"parameter": var,
"domain": domain,
}
)
yield kwargs
# iterate over ComputationalNodes
node_id = 0
for node in [_ for _ in target_node.nodes]:
gprint(
"\n >Handling node {}::{}: {} {}".format(
self.target_node.name, node_id, node.name, node.__class__
)
)
# Recursively build graph.
# If current node is a ComputationalGraph, we have to first
# build its own local graph and we extract all its operators (graph nodes).
# Else if node is a ComputationalGraphOperator, we just take the
# current node operator.
subgraph, node_ops, node_vertices, from_subgraph = self.build_subgraph(
node, current_level
)
# iterate over subgraph operators
for opvertex, op in zip(node_vertices, node_ops):
# add operator node and fill vertex properties
opnode = self.new_node(
op, subgraph, current_level, node, node_id, opvertex
)
if isinstance(node, RedistributeInter):
assert self.search_intertasks_ops
gprint(f" >[IT] Handling node {node_id}")
available_elems, needed_elems = {}, {}
for _node in target_node.nodes:
if _node is node:
break
available_elems.update(_node.output_fields)
available_elems.update(_node.output_params)
for _node in target_node.nodes[::-1]:
if _node is node:
break
needed_elems.update(_node.input_fields)
needed_elems.update(_node.input_params)
for it_redistribute_kwargs in __find_elements_to_redistribute(
available_elems, needed_elems
):
if "variable" in it_redistribute_kwargs.keys():
assert RedistributeInter.can_redistribute(
*tuple(
it_redistribute_kwargs[_]
for _ in (
"source_topo",
"target_topo",
"other_task_id",
)
)
), str(it_redistribute_kwargs)
if op.fake_init:
op.__init__(**it_redistribute_kwargs)
# Recompute fields requirements since no fields were given in first fake operator creation
first_op, first_opnode = op, opnode
else:
if "variable" in it_redistribute_kwargs.keys():
op = RedistributeInter(**it_redistribute_kwargs)
else:
op = RedistributeInterParam(**it_redistribute_kwargs)
target_node.nodes.insert(
target_node.nodes.index(first_op), op
)
gprint(
"\n >Handling node {}::{}: {} {} :: {}".format(
self.target_node.name,
node_id,
op.name,
op.__class__,
it_redistribute_kwargs,
)
)
subgraph, node_ops, node_vertices, from_subgraph = (
self.build_subgraph(op, current_level)
)
opvertex = node_vertices[0]
opnode = new_vertex(graph, op)
if isinstance(op, RedistributeInter):
cstate = self.topology_states.setdefault(
op.variable, self.new_topology_state(op.variable)
)
node = op
if isinstance(op, RedistributeInter):
op.initialize(topgraph_method=self.target_node.method)
op.get_and_set_field_requirements()
__handle_node(
node_id,
node,
subgraph,
node_ops,
node_vertices,
from_subgraph,
opvertex,
op,
opnode,
)
node_id += 1
if isinstance(op, RedistributeInter) and op.fake_init:
# Delete node because nothing has to be exchanged
target_node.nodes.remove(op)
graph.remove_node(opnode)
else:
__handle_node(
node_id,
node,
subgraph,
node_ops,
node_vertices,
from_subgraph,
opvertex,
op,
opnode,
)
node_id += 1
# On level=0 we print a summary (if asked) for input and output fields and
# their topology.
def _print_io_fields_params_summary(comment=""):
msg = f"\nComputationalGraph {target_node.name} inputs {comment}:\n"
if not self.input_fields and not self.input_params:
msg += " no inputs\n"
else:
if self.input_fields:
for ifield in sorted(self.input_fields, key=lambda x: x.name):
itopo = self.input_fields[ifield]
_, ireqs = self.input_topology_states[ifield]
msg += " *Field {} on topo {}{}\n".format(
ifield.name,
itopo.id,
f": {ireqs}" if GRAPH_BUILDER_DEBUG_LEVEL == 2 else "",
)
if len(self.input_params) > 0:
for iparam in sorted(ip.name for ip in self.input_params):
msg += f" *Parameter {iparam}\n"
msg += f"ComputationalGraph {target_node.name} outputs {comment}:\n"
if not self.output_fields and not self.output_params:
msg += " no outputs\n"
else:
if self.output_fields:
for ofield in sorted(self.output_fields, key=lambda x: x.name):
otopo = self.output_fields[ofield]
_, oreqs = self.output_topology_states[ofield]
msg += " *Field {} on topo {}{}\n".format(
ofield.name,
otopo.id,
f": {oreqs}" if GRAPH_BUILDER_DEBUG_LEVEL == 2 else "",
)
if len(self.output_params) > 0:
for oparam in sorted(op.name for op in self.output_params):
msg += f" *Parameter {oparam}\n"
msg += "\n"
gprint(msg)
if current_level == 0:
_print_io_fields_params_summary()
is_graph_updated = False
# iterate deferred nodes
for op, opnode in self._deferred_operators:
gprint(f" >Handling deferred node {op.name}")
ifields = op.input_fields
input_states = op_input_topology_states[op]
output_states = op_output_topology_states[op]
field_requirements = op.field_requirements
for ifield, itopo in sorted(ifields.items(), key=lambda x: x[0].name):
if itopo is not None:
continue
msg = "\nGraphBuilder {} could not automatically determine the "
msg += "topology of input field {} in operator {}."
msg += "\nTry to set a non empty TopologyDescriptor when passing "
msg += "the variable parameters, when creating the operator."
msg += "\nAutomatic topology detection is an experimental feature."
msg = msg.format(target_node.name, ifield.name, op.name)
if ifield not in self.topology_states:
raise RuntimeError(msg)
cstate = self.topology_states[ifield]
if cstate.first_topology_and_dstate is None:
raise RuntimeError(msg)
(itopo, dstate, node, ireqs) = cstate.first_topology_and_dstate
ifields[ifield] = itopo
input_states[ifield] = dstate
field_requirements.update_inputs({ifield: ireqs})
cstate.add_edge(graph, opnode, node, ifield, itopo)
if current_level == 0:
opnode.set_op_info(op, input_states, output_states)
# add output field Writer if necessary
if target_node._output_fields_to_dump is not None:
from hysop.operator.hdf_io import HDF_Writer
for fields, io_params, op_kwds in target_node._output_fields_to_dump:
if not fields:
fields = self.output_fields.keys()
fields = tuple(sorted(fields, key=lambda x: x.name))
for field in fields:
msg = f"{field.name} is not an output field."
assert field in self.output_fields, msg
target_topo = self.output_fields[field]
variables = {field: target_topo}
io_params = io_params.clone(
filename=f"{io_params.filename}_{field.name}_out"
)
op = HDF_Writer(io_params=io_params, variables=variables, **op_kwds)
op.initialize(topgraph_method=self.target_node.method)
op.get_and_set_field_requirements()
opnode = self.new_node(
op, None, current_level, None, None, None, None
)
ifreqs = (
None
if (current_level != 0)
else field_requirements.get_input_requirement(field)[1]
)
cstate = self.topology_states[field]
state = cstate.handle_input(
opnode, target_topo, ifreqs, graph, False
)
input_states = {field: state}
output_states = {}
self.op_input_topology_states[op] = input_states
self.op_output_topology_states[op] = output_states
if current_level == 0:
opnode.set_op_info(op, input_states, output_states)
is_graph_updated = True
# Alter states such that output topology states match input topology states
# this is only done if required (outputs_are_inputs) and if we are
# processing the top level (root) graph
if (current_level == 0) and outputs_are_inputs:
def _closure(field, itopo, itopostate, cstate):
target_topo = itopo
input_dfield_requirements, input_topology_state = itopostate
requirements = input_dfield_requirements.copy()
requirements.axes = (input_topology_state.axes,)
requirements.memory_order = input_topology_state.memory_order
cstate.output_as_input(target_topo, requirements, graph)
# Update (on closure) to have output as close as possible to inputs
if field in output_fields:
orig_topo, orig_state = (
output_fields[field],
cstate.discrete_topology_states[output_fields[field]],
)
final_topo, final_state = (
target_topo,
cstate.discrete_topology_states[target_topo],
)
kept_topo_and_state = (
orig_topo == final_topo and orig_state == final_state
)
if not kept_topo_and_state:
msg = " > Update graph outputs {} from topology {}{} to {}{}".format(
field.name,
orig_topo.tag,
f":{orig_state}" if GRAPH_BUILDER_DEBUG_LEVEL == 2 else "",
final_topo.tag,
f":{final_state}" if GRAPH_BUILDER_DEBUG_LEVEL == 2 else "",
)
gprint(msg)
self.output_fields[field] = target_topo
self.output_topology_states[field] = (
None,
cstate.discrete_topology_states[target_topo],
)
return True
return False
# identify variables that needs a closure
redistribute_fields = set(input_fields.keys())
for field in sorted(redistribute_fields, key=lambda x: x.name):
assert field in input_topology_states
is_graph_updated = _closure(
field,
input_fields[field],
input_topology_states[field],
self.topology_states[field],
)
for f in sorted(self._double_check_inputs.keys(), key=lambda x: x.name):
# all field used as input must have been written in each topology or never written
written_topos = set(self.topology_states[f].write_nodes.keys())
read_topos = set(self._double_check_inputs[f].keys())
diff = read_topos - written_topos
if len(written_topos) > 0 and len(diff) >= 0:
for t in diff:
if (
f in self.output_fields
and self.output_fields[f].mpi_params.task_id
!= t.mpi_params.task_id
):
print(
"WARNING FOR MIXED-TASK DOUBLE CHECK for",
f.name,
(self.output_fields[f], t),
)
else:
assert f in input_topology_states
is_graph_updated = _closure(
f,
t,
self._double_check_inputs[f][t],
self.topology_states[f],
)
# Final intertask redistributes as closure
if self.search_intertasks_ops and (current_level == 0) and outputs_are_inputs:
gprint(" >[IT] Inter-task closure searching")
available_elems, needed_elems = {}, {}
needed_elems.update(self.input_fields)
available_elems.update(self.output_fields)
needed_elems.update(self.input_params)
available_elems.update(self.output_params)
# Find redistribute candidates
for it_redistribute_kwargs in __find_elements_to_redistribute(
available_elems, needed_elems
):
if it_redistribute_kwargs:
if "variable" in it_redistribute_kwargs.keys():
node = RedistributeInter(**it_redistribute_kwargs)
else:
node = RedistributeInterParam(**it_redistribute_kwargs)
node_id = len(target_node.nodes)
target_node.push_nodes(node)
if isinstance(node, RedistributeInter):
node.initialize()
gprint(
" >Handling node {}: {} {}".format(
node_id, node.name, node.__class__
)
)
subgraph, node_ops, node_vertices, from_subgraph = (
self.build_subgraph(node, current_level)
)
for opvertex, op in zip(node_vertices, node_ops):
opnode = self.new_node(
op, subgraph, current_level, node, node_id, opvertex
)
__handle_node(
node_id,
node,
subgraph,
node_ops,
node_vertices,
from_subgraph,
opvertex,
op,
opnode,
)
is_graph_updated = True
if current_level == 0 and is_graph_updated:
_print_io_fields_params_summary("After closure and output dumping")
# Check that the generated graph is a directed acyclic graph
if not is_directed_acyclic_graph(graph):
msg = "\nGenerated operator graph is not acyclic."
raise RuntimeError(msg)
# Transitive reduction of graph
# This removes parallel and unnecessary transitive edges
# ie. remove useless redondant dependencies
reduced_graph = transitive_reduction(graph)
# Lexicographical topological sort
# => find out operator order for execution purposes
# => have to be exactly the same on each MPI process.
sorted_nodes = lexicographical_topological_sort(reduced_graph)
for i, node in enumerate(sorted_nodes):
node.op_ordering = i
# Command queues (each color represents a command queue)
# ie. try to find out data independent subgraphs
color = 0
queues = {}
for node in sorted_nodes:
if node.command_queue is not None:
continue
nodes = [node]
uncolored_childs = tuple(
filter(lambda n: n.command_queue is None, reduced_graph.adj[node])
)
while len(uncolored_childs) > 0:
vid = np.argmin([n.op_ordering for n in uncolored_childs])
node = uncolored_childs[vid]
nodes.append(node)
uncolored_childs = tuple(
filter(lambda n: n.command_queue is None, reduced_graph.adj[node])
)
idx_range = (nodes[0].op_ordering, nodes[-1].op_ordering)
if queues:
qkeys = tuple(sorted(queues.keys()))
color = qkeys[-1] + 1
for k in qkeys[::-1]:
paths = queues[k]
if paths[-1][1] < idx_range[0]:
src = sorted_nodes[paths[-1][1]]
dst = sorted_nodes[idx_range[0]]
all_paths = all_simple_paths(reduced_graph, src, dst)
if len(all_paths) > 0:
color = k
break
queues.setdefault(color, []).append(idx_range)
for node in nodes:
node.command_queue = color
self.reduced_graph = reduced_graph
self.sorted_nodes = sorted_nodes
self.nodes = tuple(map(lambda x: x.operator, sorted_nodes))
[docs]
def build_subgraph(self, node, current_level, **kwds):
node_ops = []
node_vertices = []
subgraph = None
from_subgraph = False
if isinstance(node, RedistributeInter):
node_operators = node.operators()
node_ops.extend(node_operators)
node_vertices += [None] * len(node_operators)
elif isinstance(node, RedistributeInterParam):
node_ops.extend(
[
node,
]
)
node_vertices += [
None,
]
elif node.mpi_params is None or node.mpi_params.on_task:
if isinstance(node, Problem):
node._build_graph(
current_level=current_level,
outputs_are_inputs=True,
search_intertasks_ops=node.search_intertasks_ops,
**kwds,
)
assert node.graph_built, "Sub-problem should be already built"
assert node.initialized, "Sub-problem should be already initialized"
node_ops.append(node)
node_vertices.append(None)
elif isinstance(node, ComputationalGraph):
node._build_graph(current_level=current_level + 1, **kwds)
node_ordering = node.sorted_nodes
subgraph = node.reduced_graph
from_subgraph = True
for nid in node_ordering:
_node = nid
op = _node.operator
node_vertices.append(_node)
node_ops.append(op)
elif isinstance(node, ComputationalGraphOperator):
node_operators = node.operators()
node_ops.extend(node_operators)
node_vertices += [None] * len(node_operators)
else:
msg = "Unknown node type {}."
raise NotImplementedError(msg.format(node.__class__.__name__))
return subgraph, node_ops, node_vertices, from_subgraph
class __ParameterHandler:
def __init__(self, graph):
self.graph = graph
self.last_write_node = {}
self.reading_nodes = {}
def add_edge(self, src_node, dst_node, parameter):
if (
(src_node is not None)
and (dst_node is not None)
and (src_node != dst_node)
):
edge = new_edge(self.graph, src_node, dst_node, parameter)
return edge
else:
return None
def handle_input_parameter(self, parameter, opnode):
check_instance(parameter, Parameter)
last_write_node = self.last_write_node.setdefault(parameter, None)
reading_nodes = self.reading_nodes.setdefault(parameter, [])
# add read dependency to last written node before current op
# (inputs are modified before actual call to the operator)
if last_write_node:
self.add_edge(last_write_node, opnode, parameter)
reading_nodes.append(opnode)
def handle_output_parameter(self, parameter, opnode):
check_instance(parameter, Parameter)
last_write_node = self.last_write_node.setdefault(parameter, None)
reading_nodes = self.reading_nodes.setdefault(parameter, [])
if last_write_node:
self.add_edge(last_write_node, opnode, parameter)
for rn in reading_nodes:
self.add_edge(rn, opnode, parameter)
reading_nodes[:] = []
self.last_write_node[parameter] = opnode
class __ContinuousFieldState:
def __init__(
self,
field,
op_input_topology_states,
op_output_topology_states,
input_fields_to_dump,
topgraph_method,
):
# all states are related to this continuous field
self.field = field
# record input and output topology states when creating auto generating
# new operators
self.op_input_topology_states = op_input_topology_states
self.op_output_topology_states = op_output_topology_states
self.first_topology_and_dstate = None
self.dump_ifield = None
if input_fields_to_dump is not None:
for fields, io_params, op_kwds in input_fields_to_dump:
if (not fields) or (field in fields):
io_params = io_params.clone(
filename=f"{io_params.filename}_{field.name}_in"
)
self.dump_ifield = (io_params, op_kwds)
break
# dictionnary (topology -> list of node) that are up to date (lastly written)
# multiple fields can be up to date at the same time after a redistribute
# operator or after an operator that implements the
# get_preserved_input_fields method.
self.write_nodes = {}
# dictionnary (topology -> list of nodes) that are currently reading
# field:topo
self.read_nodes = {}
# dictionnary (topology -> TopologyState)
self.discrete_topology_states = {}
self.method = topgraph_method
def add_vertex(self, graph, operator):
return new_vertex(graph, operator)
def add_edge(self, graph, src_node, dst_node, field, topology, reverse=False):
if (
(src_node is not None)
and (dst_node is not None)
and (src_node != dst_node)
):
if reverse:
return new_edge(graph, dst_node, src_node, field, topology)
else:
return new_edge(graph, src_node, dst_node, field, topology)
else:
return None
def push_generated_operators(
self, op_generator, op_name_prefix, src_topo, graph
):
field = self.field
read_nodes = self.read_nodes
write_nodes = self.write_nodes
dstates = self.discrete_topology_states
op_input_topology_states = self.op_input_topology_states
op_output_topology_states = self.op_output_topology_states
assert op_generator.nodes
for i, op in enumerate(op_generator.nodes):
op.name = f"{op_name_prefix}_{op.name}"
if len(op_generator.nodes) > 1:
op.name += f"__{i}"
op.initialize(topgraph_method=self.method)
assert len(op.input_fields) == 1
assert len(op.output_fields) == 1
assert next(iter(op.input_fields)) == field
assert next(iter(op.output_fields)) == field
assert next(iter(op.input_fields.values())) == src_topo
dst_topo = next(iter(op.output_fields.values()))
op_node = self.add_vertex(graph, op)
# handle input
if src_topo in write_nodes:
src_node = write_nodes[src_topo]
self.add_edge(graph, src_node, op_node, field, src_topo)
# handle output
ro_nodes = read_nodes.setdefault(dst_topo, [])
for ro_node in ro_nodes:
self.add_edge(graph, ro_node, op_node, field, dst_topo)
read_nodes[dst_topo] = []
write_nodes[dst_topo] = op_node
if dst_topo is not src_topo:
read_nodes.setdefault(src_topo, []).append(op_node)
istate = {field: dstates[src_topo]}
dstates[dst_topo] = op.output_topology_state(field, istate)
ostate = {field: dstates[dst_topo]}
assert op not in op_input_topology_states
assert op not in op_output_topology_states
op_input_topology_states[op] = istate
op_output_topology_states[op] = ostate
op_node.set_op_info(op, istate, ostate)
src_node = op_node
src_topo = dst_topo
return dst_topo
def redistribute(self, target_topo, graph, src_topo=None):
field = self.field
write_nodes = self.write_nodes
dstates = self.discrete_topology_states
# field has never been written
if not write_nodes:
return
src_topos = write_nodes.keys()
if src_topo is not None:
assert src_topo in src_topos
src_topos = (src_topo,)
if target_topo in src_topos:
# topology is already up to date with lastest write, nothing to do
return
msg0 = "field {} from up to date topology:"
msg0 += "\n |-{}\n to topology\n |>{}"
msg0 = msg0.format(
field.name,
"\n |-".join(t.short_description() for t in src_topos),
target_topo.short_description(),
)
gprint(f" >Redistribute {msg0}")
# field is out of date on target topology, we should redistribute data
# from another topology
try:
redistribute_generator = Redistribute(
variables=field, source_topos=src_topos, target_topo=target_topo
)
redistribute_generator.generate()
except RedistributeNotImplementedError:
msg = "FATAL ERROR: Graph builder could not find suitable operator on "
msg += "backend {} to redistribute {}"
msg = msg.format(src_topo.backend.kind, msg0)
print(f"\n{msg}\n")
raise
src_topo = redistribute_generator.nodes[0].source_topo
assert src_topo in src_topos
dst_topo = self.push_generated_operators(
redistribute_generator, "R", src_topo, graph
)
assert dst_topo == target_topo
def transpose(self, topo, target_axes, graph):
field = self.field
write_nodes = self.write_nodes
dstates = self.discrete_topology_states
assert topo in dstates
src_state = dstates[topo]
if not target_axes:
return
if src_state.axes in target_axes:
return
msg = (
" >Transpose from state {} to any of those transposition states "
)
msg += "[{},] "
msg = msg.format(
src_state.tstate,
", ".join(
[
str(TranspositionState.axes_to_tstate(axes))
for axes in target_axes
]
),
)
gprint(msg)
def find_permutation(src_axes, dst_axes):
axes = ()
for ai in dst_axes:
aj = src_axes.index(ai)
axes += (aj,)
return axes
candidate_axes = ()
for target in target_axes:
axes = find_permutation(src_state.axes, target)
candidate_axes += (axes,)
try:
transpose_generator = Transpose(
fields=field, variables={field: topo}, axes=candidate_axes
)
transpose_generator.generate()
except TranspositionNotImplementedError:
msg = "FATAL ERROR: Graph builder could not find suitable operator on "
msg += "backend {} to transpose from state {} to any of those "
msg += "transposition states [{},] for field {} on topology id {}."
msg = msg.format(
topo.backend.kind,
src_state.tstate,
", ".join(
[
TranspositionState.axes_to_tstate(axes)
for axes in target_axes
]
),
field.name,
topo.id,
)
print(f"\n{msg}\n")
raise
dst_topo = self.push_generated_operators(
transpose_generator, "T", topo, graph
)
assert dst_topo == topo
def reorder(self, topo, target_memory_order, graph):
field = self.field
write_nodes = self.write_nodes
dstates = self.discrete_topology_states
assert topo in dstates
src_state = dstates[topo]
assert src_state.memory_order is not MemoryOrdering.ANY
if src_state.memory_order == target_memory_order:
return
if target_memory_order is MemoryOrdering.ANY:
return
msg = " >MemoryReordering from memory order {} to memory order {}."
msg = msg.format(src_state.memory_order, target_memory_order)
gprint(msg)
try:
reorder_generator = MemoryReordering(
fields=field,
variables={field: topo},
target_memory_order=target_memory_order,
)
reorder_generator.generate()
except MemoryReorderingNotImplementedError:
msg = "FATAL ERROR: Graph builder could not find suitable operator on "
msg += "backend {} to reorder a field from order {} to order {} "
msg += "for field {} on topology id {}."
msg = msg.format(
topo.backend.kind,
src_state.memory_order,
target_memory_order,
field.name,
topo.id,
)
print(f"\n{msg}\n")
raise
dst_topo = self.push_generated_operators(
reorder_generator, "MR", topo, graph
)
assert dst_topo == topo
def handle_input(
self, opnode, target_topo, target_dfield_requirements, graph, is_new
):
ifield = self.field
write_nodes = self.write_nodes
read_nodes = self.read_nodes
dtopology_states = self.discrete_topology_states
is_root = target_dfield_requirements is not None
dim = target_topo.domain.dim
tid = target_topo.task_id
check_instance(
target_dfield_requirements, DiscreteFieldRequirements, allow_none=True
)
assert (not is_root) or (target_dfield_requirements.field == ifield)
# if this is a new input for the graph, should we create a dumper ?
dump_input = False
dump_input = is_new and (self.dump_ifield is not None)
if dump_input:
from hysop.operator.hdf_io import HDF_Writer
io_params, op_kwds = self.dump_ifield
variables = {ifield: target_topo}
writer_op = HDF_Writer(
io_params=io_params, variables=variables, **op_kwds
)
writer_op.initialize(topgraph_method=self.method)
writer_op.get_and_set_field_requirements()
writer_opnode = self.add_vertex(graph, writer_op)
self.add_edge(graph, writer_opnode, opnode, ifield, target_topo)
# we only handle input field requirements when we are root graph
# ie. target_dfield_requirements is None
# We do this because overall graph inputs will determine initial states
# and the start topologies for each variables
if is_root:
# check if the field has ever been written
if not write_nodes:
# has it already been read ?
if not read_nodes:
# adapt to this first operator
assert target_topo not in dtopology_states
istate = dtopology_states.setdefault(
target_topo, CartesianTopologyState(dim, tid)
)
if target_dfield_requirements:
allowed_axes = target_dfield_requirements.axes
default_axes = TranspositionState[dim].default_axes()
if (allowed_axes is None) or (default_axes in allowed_axes):
istate.axes = default_axes
else:
istate.axes = allowed_axes[0]
allowed_memory_order = (
target_dfield_requirements.memory_order
)
default_memory_order = self.discrete_topology_states[
target_topo
].memory_order
assert default_memory_order is not MemoryOrdering.ANY
if allowed_memory_order is MemoryOrdering.ANY:
istate.memory_order = default_memory_order
else:
istate.memory_order = allowed_memory_order
gprint2(f" >Initial state set to {istate}")
else:
istate = dtopology_states.setdefault(
target_topo, CartesianTopologyState(dim, tid)
)
gprint2(f" >Input state is {istate}")
target_axes = target_dfield_requirements.axes
target_memory_order = target_dfield_requirements.memory_order
def topology_affinity(candidate_topo):
candidate_state = self.discrete_topology_states[candidate_topo]
# discard out-of taks topos
score = (candidate_topo.task_id != target_topo.task_id) * -10000000
# skip redistribute
score += (candidate_topo is target_topo) * 1000000
# skip multiresolution filter (not automatically handled yet)
score += (
candidate_topo.grid_resolution == target_topo.grid_resolution
).all() * 100000
# skip transpose
score += (
(target_axes is not None)
and (candidate_state.axes in target_axes)
) * 10000
# better bandwidth
score += (candidate_topo.backend is target_topo.backend) * 1000
# better bandwidth
score += (
candidate_topo.backend.kind is target_topo.backend.kind
) * 100
# memory reordering is a noop
score += (
(target_memory_order is not MemoryOrdering.ANY)
and (candidate_state.memory_order is target_memory_order)
) * 1
# penalize number of ghosts
score -= np.prod(candidate_topo.ghosts)
return score
if (target_topo.backend.kind is Backend.HOST) and write_nodes:
# give source topo priority according to topology_affinity
src_topos = write_nodes.keys()
src_topos = tuple(
sorted(src_topos, key=topology_affinity, reverse=True)
)
src_topo = src_topos[0]
if target_topo.mpi_params.task_id != src_topo.mpi_params.task_id:
dtopology_states[target_topo] = src_topo.topology_state
else:
if src_topo is not target_topo:
msg = " >Redistributing field {} from up to date topologies {} "
msg += "to host topology {}."
msg = msg.format(
ifield.name,
" ,".join(t.pretty_tag for t in src_topos),
target_topo.pretty_tag,
)
gprint(msg)
self.transpose(src_topo, target_axes, graph)
self.redistribute(target_topo, graph, src_topo=src_topo)
# we can always reorder target because this a host topology
self.reorder(target_topo, target_memory_order, graph)
elif (target_topo.backend.kind is Backend.OPENCL) and write_nodes:
# give source topo priority according to topology_affinity
src_topos = write_nodes.keys()
src_topos = tuple(
sorted(src_topos, key=topology_affinity, reverse=True)
)
src_topo = src_topos[0]
if target_topo.mpi_params.task_id != src_topo.mpi_params.task_id:
dtopology_states[target_topo] = src_topo.topology_state
else:
if src_topo is not target_topo:
msg = " >Redistributing field {} from up to date topologies {} "
msg += "to device topology {}."
msg = msg.format(
ifield.name,
" ,".join(t.pretty_tag for t in src_topos),
target_topo.pretty_tag,
)
gprint(msg)
self.reorder(src_topo, target_memory_order, graph)
self.redistribute(target_topo, graph, src_topo=src_topo)
# target is always opencl so we transpose here
self.transpose(target_topo, target_axes, graph)
else:
self.transpose(target_topo, target_axes, graph)
self.reorder(target_topo, target_memory_order, graph)
istate = dtopology_states[target_topo]
gprint2(f" >Input state is now {istate}")
else:
istate = None
if dump_input:
input_states = {ifield: istate}
output_states = {}
self.op_input_topology_states[writer_op] = input_states
self.op_output_topology_states[writer_op] = output_states
writer_opnode.set_op_info(writer_op, input_states, output_states)
# add read dependency to last written node before current op
# (so that inputs are modified before actual call to the operator)
if target_topo in write_nodes:
last_write_node = write_nodes[target_topo]
self.add_edge(graph, last_write_node, opnode, ifield, target_topo)
elif (not is_root) and write_nodes:
for node in self.write_nodes.values():
self.add_edge(graph, node, opnode, ifield, target_topo)
read_nodes.setdefault(target_topo, []).append(opnode)
if is_new:
self.first_topology_and_dstate = (
target_topo,
istate,
opnode,
target_dfield_requirements,
)
return istate
def handle_output(
self,
opnode,
output_topo,
oreqs,
operator,
input_topology_states,
invalidate_field,
graph,
node_list=[],
):
ofield = self.field
write_nodes = self.write_nodes
read_nodes = self.read_nodes
dtopology_states = self.discrete_topology_states
is_root = input_topology_states is not None
# add dependency to last node written to prevent
# concurent write-writes.
if output_topo in write_nodes:
src = write_nodes[output_topo]
check_reverse = src.operator in node_list and operator in node_list
self.add_edge(
graph,
src,
opnode,
ofield,
output_topo,
reverse=check_reverse
and node_list.index(src.operator) > node_list.index(operator),
)
if invalidate_field:
msg = " >Invalidating output field {} on all topologies but {} "
msg += "because is has been freshly written."
msg = msg.format(ofield.name, output_topo.pretty_tag)
gprint(msg)
# add dependency to all operators that reads this field
# to prevent concurent read-writes.
if output_topo in read_nodes:
for ro_node in read_nodes[output_topo]:
if not ro_node is None:
check_reverse = (
ro_node.operator in node_list and operator in node_list
)
self.add_edge(
graph,
ro_node,
opnode,
ofield,
output_topo,
reverse=check_reverse
and node_list.index(ro_node.operator)
> node_list.index(operator),
)
else:
self.add_edge(graph, ro_node, opnode, ofield, output_topo)
# remove read/write dependencies and states
write_nodes.clear()
dtopology_states.clear()
else:
msg = (
" >Keeping output field {} up to date on all topologies because "
)
msg += "is has been marked as preserved by operator."
msg = msg.format(ofield.name)
gprint(msg)
msg = " >Up to date topologies for field {} are now {}, {}."
msg = msg.format(
ofield.name,
output_topo.pretty_tag,
" ,".join(t.pretty_tag for t in write_nodes),
)
gprint(msg)
# add the operator node as the one that lastly wrote this field.
# no other operators can be reading as this topology just been written.
read_nodes[output_topo] = []
write_nodes[output_topo] = opnode
if is_root:
if isinstance(operator, Problem):
ostate = operator.final_output_topology_states[ofield][1]
else:
ostate = operator.output_topology_state(
ofield, input_topology_states
)
dtopology_states[output_topo] = ostate
gprint2(f" >Output state is now {ostate}")
else:
ostate = None
if self.first_topology_and_dstate is None:
self.first_topology_and_dstate = (output_topo, ostate, opnode, oreqs)
return ostate
def output_as_input(self, target_topo, dstate, graph):
self.handle_input(None, target_topo, dstate, graph, False)